#include <utility>

#include <utility>

//
// Created by pulsarv on 19-12-6.
//

#include <task_manager.h>
#include <src/common/base/slog.hpp>
#include <src/include/data_struct.h>


namespace MobileSearch {
    boost::mutex video_io_mutex;
    boost::mutex camera_thread_mutex;
    bool start_camera_flag=false;

    void tryPush(const std::weak_ptr<Worker> &worker, std::shared_ptr<Task> &&task) {
        try {
            std::shared_ptr<Worker>(worker)->push(task);
        } catch (const std::bad_weak_ptr &) {}
    }


    bool try_lock_sources() {
        return video_io_mutex.try_lock();
    }

    void lock_sources() {
        video_io_mutex.lock();
    }

    void unlock_sources() {
        try {
            video_io_mutex.unlock();
        } catch (...) {
            slog::warn << "Unlock sources faild" << slog::endl;
        }

    }

    Worker::Worker(unsigned threadNum) :
            threadPull(threadNum), running{false} {}

    Worker::~Worker() {
        stop();
    }

    void Worker::runThreads() {
        running = true;
        for (std::thread &t : threadPull) {
            t = std::thread(&Worker::threadFunc, this);
        }
    }

    void Worker::push(std::shared_ptr<Task> task) {
        tasksMutex.lock();
        tasks.insert(task);
        tasksMutex.unlock();
        tasksCondVar.notify_one();
    }

    void Worker::threadFunc() {
        while (running) {
            std::unique_lock<std::mutex> lk(tasksMutex);
            while (running && tasks.empty()) {
                tasksCondVar.wait(lk);
            }
            try {
                auto it = std::find_if(tasks.begin(), tasks.end(),
                                       [](const std::shared_ptr<Task> &task) { return task->isReady(); });
                if (tasks.end() != it) {
                    const std::shared_ptr<Task> task = *it;
                    tasks.erase(it);
                    lk.unlock();
                    task->process();
                }
            } catch (...) {
                std::lock_guard<std::mutex> lock{excpetionMutex};
                if (nullptr == currentException) {
                    currentException = std::current_exception();
                    stop();
                }
            }
        }
    }

    void Worker::stop() {
        running = false;
        tasksCondVar.notify_all();
    }

    void Worker::join() {
        for (auto &t : threadPull) {
            t.join();
        }
        if (nullptr != currentException) {
            std::rethrow_exception(currentException);
        }
    }

    TaskManager::TaskManager(std::vector<camera_t> cameras) : cameras(std::move(cameras)) {

    }

    TaskManager::~TaskManager() {
        slog::success << "Destory Task Manager" << slog::endl;
    }

    void TaskManager::clear_message_queue() {
        for (const camera_t &camera : cameras) {
            try {
                boost::interprocess::message_queue::remove(camera.message_queue.c_str());
                slog::success << "Clear " << camera.message_queue << " Success" << slog::endl;
            } catch (boost::interprocess::interprocess_exception &e) {
                slog::warn << "Clear " << camera.message_queue << " error" << slog::endl;
            }
        }
    }


    void TaskManager::stop_worker() {
        videoThread->stop();
    }

    void TaskManager::run_worker() {

        unlock_sources();
        camera_thread_mutex.unlock();

        for (const camera_t &camera:cameras) {
            try {
                boost::interprocess::message_queue *mobilesearch_video_message_queue = new boost::interprocess::message_queue(
                        boost::interprocess::create_only,
                        camera.message_queue.c_str(),
                        MAX_QUEUE_SIZE,
                        DATA_SIZE
                );
                slog::success << "Create " << camera.message_queue << " Success" << slog::endl;
                start_camera_flag=true;
                boost::shared_ptr<boost::interprocess::message_queue> p_mobilesearch_video_message_queue(
                        mobilesearch_video_message_queue);
                videoThread = new VideoThread(camera);
                videoThread->run(p_mobilesearch_video_message_queue);
            } catch (boost::interprocess::interprocess_exception &e) {
                slog::warn << "Create " << camera.message_queue << " Error" << slog::endl;
            }
        }
    }


    VideoThread::VideoThread(camera_t camera) : camera(std::move(camera)) {


    }

    void VideoThread::stop() {
        slog::info << "Close Camera:" << camera.index << slog::endl;
        camera_thread_mutex.lock();
        lock_sources();
        start_camera_flag=false;
    }

    void VideoThread::run(boost::shared_ptr<boost::interprocess::message_queue> &p_mobilesearch_video_message_queue) {
        slog::info << "Start Camera:" << camera.index << " Thread" << slog::endl;
        v_thread = new boost::thread(video_thread, camera.index, p_mobilesearch_video_message_queue);
        v_thread->detach();
    }

    void VideoThread::video_thread(int &camera_index,
                                   const boost::shared_ptr<boost::interprocess::message_queue> &p_mobilesearch_video_message_queue) {
        cv::VideoCapture cap(camera_index);
        if (cap.isOpened()) {
            slog::success << "Camera Index:" << camera_index << " Opened" << slog::endl;
            while (start_camera_flag) {
                if (!camera_thread_mutex.try_lock())continue;

                try {
                    if (try_lock_sources() and (p_mobilesearch_video_message_queue->get_num_msg() < MAX_QUEUE_SIZE)) {
                        cv::Mat frame;
                        cap.read(frame);
                        char *package_frame_buffer = new char[PACKAGE_SIZE];
                        data_package_t dataPackage;
                        dataPackage.data_type=PIP_LINE;
                        memset(dataPackage.data, 0, IMAGE_BUFFERSIZE);
                        memcpy(dataPackage.data, frame.data, frame.cols * frame.rows * frame.channels());
                        memcpy(package_frame_buffer,&dataPackage, sizeof(data_package_t));
//                        int data_length = frame.cols * frame.rows * frame.channels();
                        unsigned int priority = 0;
                        p_mobilesearch_video_message_queue->send(package_frame_buffer, PACKAGE_SIZE, priority);
//                        cv::Mat img(cv::Size(frame.cols, frame.rows), CV_8UC3);
//                        for (int i = 0; i < data_length; i++) {
//                            img.at<cv::Vec3b>(i / (frame.cols * frame.channels()),
//                                              (i % (frame.cols * frame.channels())) / frame.channels())[i %
//                                                                                                        frame.channels()] = frame_buffer[i];
//                        }
//                        slog::info<<"Send data_length:"<<data_length<<" cols:"<< frame.cols <<" rows:"<< frame.rows <<" channels:"<< frame.channels() <<" Max Message Size:"<<IMAGE_BUFFERSIZE<< slog::endl;
                        free(package_frame_buffer);
                    }
                    unlock_sources();

                } catch (boost::interprocess::interprocess_exception &e) {
                    slog::warn << "Send Message Queue Error" << slog::endl;
                    break;
                } catch (std::exception &e) {
                    slog::warn << "Sources Unlock Error" << e.what() << slog::endl;
                }
                camera_thread_mutex.unlock();
            }
        }
        slog::success << "Camera " << camera_index << " Thread Stoped" << slog::endl;
        cap.release();
    }

    ReborningVideoFrame::~ReborningVideoFrame() {
        try {
            const std::shared_ptr<Worker> &worker = std::shared_ptr<Worker>(context.readersContext.readersWorker);
            context.videoFramesContext.lastFrameIdsMutexes[sourceID].lock();
            const auto frameId = ++context.videoFramesContext.lastframeIds[sourceID];
            context.videoFramesContext.lastFrameIdsMutexes[sourceID].unlock();
            std::shared_ptr<ReborningVideoFrame> reborn = std::make_shared<ReborningVideoFrame>(context, sourceID,
                                                                                                frameId,
                                                                                                frame);
            worker->push(std::make_shared<Reader>(reborn));
        } catch (const std::bad_weak_ptr &) {}
    }

    bool Drawer::isReady() {
        Context &context = dynamic_cast<ReborningVideoFrame *>(sharedVideoFrame.get())->context;
        std::chrono::steady_clock::time_point prevShow = context.drawersContext.prevShow;
        std::chrono::steady_clock::duration showPeriod = context.drawersContext.showPeriod;
        if (1u == context.drawersContext.gridParam.size()) {
            return std::chrono::steady_clock::now() - prevShow > showPeriod;
        } else {
            std::map<int64_t, GridMat> &gridMats = context.drawersContext.gridMats;
            auto gridMatIt = gridMats.find(sharedVideoFrame->frameId);
            if (gridMats.end() == gridMatIt) {
                return 2 > gridMats.size();
            } else {
                if (1u == gridMatIt->second.getUnupdatedSourceIDs().size()) {
                    return context.drawersContext.lastShownframeId == sharedVideoFrame->frameId
                           && std::chrono::steady_clock::now() - prevShow > showPeriod;
                } else {
                    return true;
                }
            }
        }
    }

    void Drawer::process() {
        const int64_t frameId = sharedVideoFrame->frameId;
        Context &context = dynamic_cast<ReborningVideoFrame *>(sharedVideoFrame.get())->context;
        std::map<int64_t, GridMat> &gridMats = context.drawersContext.gridMats;
        context.drawersContext.drawerMutex.lock();
        auto gridMatIt = gridMats.find(frameId);
        if (gridMats.end() == gridMatIt) {
            gridMatIt = gridMats.emplace(frameId, GridMat(context.drawersContext.gridParam,
                                                          context.drawersContext.displayResolution)).first;
        }

        gridMatIt->second.update(sharedVideoFrame->frame, sharedVideoFrame->sourceID);
        auto firstGridIt = gridMats.begin();
        int64_t &lastShownframeId = context.drawersContext.lastShownframeId;
        if (firstGridIt->first == lastShownframeId && firstGridIt->second.isFilled()) {
            lastShownframeId++;
            cv::Mat mat = firstGridIt->second.getMat();

            constexpr float OPACITY = 0.6f;
            fillROIColor(mat, cv::Rect(5, 5, 390, 115), cv::Scalar(255, 0, 0), OPACITY);
            cv::putText(mat, "Detection InferRequests usage", cv::Point2f(15, 70), cv::FONT_HERSHEY_TRIPLEX, 0.7,
                        cv::Scalar{255, 255, 255});
            cv::Rect usage(15, 90, 370, 20);
            cv::rectangle(mat, usage, {0, 255, 0}, 2);
            uint64_t nireq = context.nireq;
            uint64_t frameCounter = context.frameCounter;
            usage.width = static_cast<int>(usage.width *
                                           static_cast<float>(frameCounter * nireq - context.freeDetectionInfersCount) /
                                           (frameCounter * nireq));
            cv::rectangle(mat, usage, {0, 255, 0}, cv::FILLED);

            context.drawersContext.framesAfterUpdate++;
            const std::chrono::steady_clock::time_point localT1 = std::chrono::steady_clock::now();
            const Sec timeDuration = localT1 - context.drawersContext.updateTime;
            if (Sec{1} <= timeDuration || context.drawersContext.updateTime == context.t0) {
                context.drawersContext.outThroughput.str("");
                context.drawersContext.outThroughput << std::fixed << std::setprecision(1)
                                                     << static_cast<float>(context.drawersContext.framesAfterUpdate) /
                                                        timeDuration.count() << "FPS";
                context.drawersContext.framesAfterUpdate = 0;
                context.drawersContext.updateTime = localT1;
            }
            cv::putText(mat, context.drawersContext.outThroughput.str(), cv::Point2f(15, 35), cv::FONT_HERSHEY_TRIPLEX,
                        0.7,
                        cv::Scalar{255, 255, 255});

            cv::imshow("Detection results", firstGridIt->second.getMat());
            context.drawersContext.prevShow = std::chrono::steady_clock::now();
            const int key = cv::waitKey(context.drawersContext.pause);
            if (key == 27 || 'q' == key || 'Q' == key || !context.isVideo) {
                try {
                    std::shared_ptr<Worker>(context.drawersContext.drawersWorker)->stop();
                } catch (const std::bad_weak_ptr &) {}
            } else if (key == 32) {
                context.drawersContext.pause = (context.drawersContext.pause + 1) & 1;
            }
            firstGridIt->second.clear();
            gridMats.emplace((--gridMats.end())->first + 1, firstGridIt->second);
            gridMats.erase(firstGridIt);
        }
        context.drawersContext.drawerMutex.unlock();
    }

    void ResAggregator::process() {
        Context &context = dynamic_cast<ReborningVideoFrame *>(sharedVideoFrame.get())->context;
        context.freeDetectionInfersCount += context.detectorsInfers.inferRequests.lockedSize();
        context.frameCounter++;
//        if (!FLAGS_no_show) {
        if (!false) {
            for (const BboxAndDescr &bboxAndDescr : boxesAndDescrs) {
                switch (bboxAndDescr.objectType) {
                    case BboxAndDescr::ObjectType::NONE:
                        cv::rectangle(sharedVideoFrame->frame, bboxAndDescr.rect, {255, 255, 0}, 4);
                        break;
                    case BboxAndDescr::ObjectType::VEHICCLE:
                        cv::rectangle(sharedVideoFrame->frame, bboxAndDescr.rect, {0, 255, 0}, 4);
                        cv::putText(sharedVideoFrame->frame, bboxAndDescr.descr,
                                    cv::Point{bboxAndDescr.rect.x, bboxAndDescr.rect.y + 35},
                                    cv::FONT_HERSHEY_COMPLEX, 1.3, cv::Scalar(0, 255, 0), 4);
                        break;
                    case BboxAndDescr::ObjectType::PLATE:
                        cv::rectangle(sharedVideoFrame->frame, bboxAndDescr.rect, {0, 0, 255}, 4);
                        cv::putText(sharedVideoFrame->frame, bboxAndDescr.descr,
                                    cv::Point{bboxAndDescr.rect.x, bboxAndDescr.rect.y - 10},
                                    cv::FONT_HERSHEY_COMPLEX, 1.3, cv::Scalar(0, 0, 255), 4);
                        break;
                    default:
                        throw std::exception();  // must never happen
                        break;
                }
            }
            tryPush(context.drawersContext.drawersWorker, std::make_shared<Drawer>(sharedVideoFrame));
        } else {
            if (!context.isVideo) {
                try {
                    std::shared_ptr<Worker>(context.drawersContext.drawersWorker)->stop();
                } catch (const std::bad_weak_ptr &) {}
            }
        }
    }

    bool DetectionsProcessor::isReady() {
        Context &context = dynamic_cast<ReborningVideoFrame *>(sharedVideoFrame.get())->context;
        if (requireGettingNumberOfDetections) {
            classifiersAggreagator = std::make_shared<ClassifiersAggreagator>(sharedVideoFrame);
            std::list<Detector::Result> results;
//            if (!(FLAGS_r && ((sharedVideoFrame->frameId == 0 && !context.isVideo) || context.isVideo))) {
            if (!(((sharedVideoFrame->frameId == 0 && !context.isVideo) || context.isVideo))) {
                results = context.inferTasksContext.detector.getResults(*inferRequest, sharedVideoFrame->frame.size());
            } else {
                std::ostringstream rawResultsStream;
                results = context.inferTasksContext.detector.getResults(*inferRequest, sharedVideoFrame->frame.size(),
                                                                        &rawResultsStream);
                classifiersAggreagator->rawDetections = rawResultsStream.str();
            }
            for (Detector::Result result : results) {
                switch (result.label) {
                    case 1: {
                        vehicleRects.emplace_back(
                                result.location & cv::Rect{cv::Point(0, 0), sharedVideoFrame->frame.size()});
                        break;
                    }
                    case 2: {
                        // expanding a bounding box a bit, better for the license plate recognition
                        result.location.x -= 5;
                        result.location.y -= 5;
                        result.location.width += 10;
                        result.location.height += 10;
                        plateRects.emplace_back(
                                result.location & cv::Rect{cv::Point(0, 0), sharedVideoFrame->frame.size()});
                        break;
                    }
                    default:
                        throw std::exception();  // must never happen
                        break;
                }
            }
            context.detectorsInfers.inferRequests.lockedPush_back(*inferRequest);
            requireGettingNumberOfDetections = false;
        }

//        if ((vehicleRects.empty() || FLAGS_m_va.empty()) && (plateRects.empty() || FLAGS_m_lpr.empty())) {
        if ((vehicleRects.empty()) && (plateRects.empty())) {
            return true;
        } else {
            // isReady() is called under mutexes so it is assured that available InferRequests will not be taken, but new InferRequests can come in
            // acquire as many InferRequests as it is possible or needed
            InferRequestsContainer &attributesInfers = context.attributesInfers;
            attributesInfers.inferRequests.mutex.lock();
            const std::size_t numberOfAttributesInferRequestsAcquired = std::min(vehicleRects.size(),
                                                                                 attributesInfers.inferRequests.container.size());
            reservedAttributesRequests.assign(
                    attributesInfers.inferRequests.container.end() - numberOfAttributesInferRequestsAcquired,
                    attributesInfers.inferRequests.container.end());
            attributesInfers.inferRequests.container.erase(
                    attributesInfers.inferRequests.container.end() - numberOfAttributesInferRequestsAcquired,
                    attributesInfers.inferRequests.container.end());
            attributesInfers.inferRequests.mutex.unlock();

            InferRequestsContainer &platesInfers = context.platesInfers;
            platesInfers.inferRequests.mutex.lock();
            const std::size_t numberOfLprInferRequestsAcquired = std::min(plateRects.size(),
                                                                          platesInfers.inferRequests.container.size());
            reservedLprRequests.assign(platesInfers.inferRequests.container.end() - numberOfLprInferRequestsAcquired,
                                       platesInfers.inferRequests.container.end());
            platesInfers.inferRequests.container.erase(
                    platesInfers.inferRequests.container.end() - numberOfLprInferRequestsAcquired,
                    platesInfers.inferRequests.container.end());
            platesInfers.inferRequests.mutex.unlock();
            return numberOfAttributesInferRequestsAcquired || numberOfLprInferRequestsAcquired;
        }
    }

    void DetectionsProcessor::process() {
        Context &context = dynamic_cast<ReborningVideoFrame *>(sharedVideoFrame.get())->context;
//        if (!FLAGS_m_va.empty()) {
        if (!m_va.empty()) {
            auto vehicleRectsIt = vehicleRects.begin();
            for (auto attributesRequestIt = reservedAttributesRequests.begin();
                 attributesRequestIt != reservedAttributesRequests.end();
                 vehicleRectsIt++, attributesRequestIt++) {
                const cv::Rect vehicleRect = *vehicleRectsIt;
                InferenceEngine::InferRequest &attributesRequest = *attributesRequestIt;
                context.detectionsProcessorsContext.vehicleAttributesClassifier.setImage(attributesRequest,
                                                                                         sharedVideoFrame->frame,
                                                                                         vehicleRect);

                attributesRequest.SetCompletionCallback(
                        std::bind(
                                [](std::shared_ptr<ClassifiersAggreagator> classifiersAggreagator,
                                   InferenceEngine::InferRequest &attributesRequest,
                                   cv::Rect rect,
                                   Context &context) {
                                    attributesRequest.SetCompletionCallback([] {});  // destroy the stored bind object

                                    const std::pair<std::string, std::string> &attributes
                                            = context.detectionsProcessorsContext.vehicleAttributesClassifier.getResults(
                                                    attributesRequest);

//                                    if (FLAGS_r &&
                                    if (false &&
                                        ((classifiersAggreagator->sharedVideoFrame->frameId == 0 && !context.isVideo) ||
                                         context.isVideo)) {
                                        classifiersAggreagator->rawAttributes.lockedPush_back(
                                                "Vehicle Attributes results:" + attributes.first + ';'
                                                + attributes.second + '\n');
                                    }
                                    classifiersAggreagator->push(BboxAndDescr{BboxAndDescr::ObjectType::VEHICCLE, rect,
                                                                              attributes.first + ' ' +
                                                                              attributes.second});
                                    context.attributesInfers.inferRequests.lockedPush_back(attributesRequest);
                                }, classifiersAggreagator,
                                std::ref(attributesRequest),
                                vehicleRect,
                                std::ref(context)));

                attributesRequest.StartAsync();
            }
            vehicleRects.erase(vehicleRects.begin(), vehicleRectsIt);
        } else {
            for (const cv::Rect vehicleRect : vehicleRects) {
                classifiersAggreagator->push(BboxAndDescr{BboxAndDescr::ObjectType::NONE, vehicleRect, ""});
            }
            vehicleRects.clear();
        }

        if (!m_lpr.empty()) {
            auto plateRectsIt = plateRects.begin();
            for (auto lprRequestsIt = reservedLprRequests.begin();
                 lprRequestsIt != reservedLprRequests.end(); plateRectsIt++, lprRequestsIt++) {
                const cv::Rect plateRect = *plateRectsIt;
                InferenceEngine::InferRequest &lprRequest = *lprRequestsIt;
                context.detectionsProcessorsContext.lpr.setImage(lprRequest, sharedVideoFrame->frame, plateRect);

                lprRequest.SetCompletionCallback(
                        std::bind(
                                [](std::shared_ptr<ClassifiersAggreagator> classifiersAggreagator,
                                   InferenceEngine::InferRequest &lprRequest,
                                   cv::Rect rect,
                                   Context &context) {
                                    lprRequest.SetCompletionCallback([] {});  // destroy the stored bind object

                                    std::string result = context.detectionsProcessorsContext.lpr.getResults(lprRequest);

//                                    if (FLAGS_r &&
                                    if (false &&
                                        ((classifiersAggreagator->sharedVideoFrame->frameId == 0 && !context.isVideo) ||
                                         context.isVideo)) {
                                        classifiersAggreagator->rawDecodedPlates.lockedPush_back(
                                                "License Plate Recognition results:" + result + '\n');
                                    }
                                    classifiersAggreagator->push(
                                            BboxAndDescr{BboxAndDescr::ObjectType::PLATE, rect, std::move(result)});
                                    context.platesInfers.inferRequests.lockedPush_back(lprRequest);
                                }, classifiersAggreagator,
                                std::ref(lprRequest),
                                plateRect,
                                std::ref(context)));

                lprRequest.StartAsync();
            }
            plateRects.erase(plateRects.begin(), plateRectsIt);
        } else {
            for (const cv::Rect &plateRect : plateRects) {
                classifiersAggreagator->push(BboxAndDescr{BboxAndDescr::ObjectType::NONE, plateRect, ""});
            }
            plateRects.clear();
        }
        if (!vehicleRects.empty() || !plateRects.empty()) {
            tryPush(context.detectionsProcessorsContext.detectionsProcessorsWorker,
                    std::make_shared<DetectionsProcessor>(sharedVideoFrame, std::move(classifiersAggreagator),
                                                          std::move(vehicleRects), std::move(plateRects)));
        }
    }

    bool InferTask::isReady() {
        InferRequestsContainer &detectorsInfers = dynamic_cast<ReborningVideoFrame *>(sharedVideoFrame.get())->context.detectorsInfers;
        if (detectorsInfers.inferRequests.container.empty()) {
            return false;
        } else {
            detectorsInfers.inferRequests.mutex.lock();
            if (detectorsInfers.inferRequests.container.empty()) {
                detectorsInfers.inferRequests.mutex.unlock();
                return false;
            } else {
                return true;  // process() will unlock the mutex
            }
        }
    }

    void InferTask::process() {
        Context &context = dynamic_cast<ReborningVideoFrame *>(sharedVideoFrame.get())->context;
        InferRequestsContainer &detectorsInfers = context.detectorsInfers;
        std::reference_wrapper<InferenceEngine::InferRequest> inferRequest = detectorsInfers.inferRequests.container.back();
        detectorsInfers.inferRequests.container.pop_back();
        detectorsInfers.inferRequests.mutex.unlock();

        context.inferTasksContext.detector.setImage(inferRequest, sharedVideoFrame->frame);

        inferRequest.get().SetCompletionCallback(
                std::bind(
                        [](VideoFrame::Ptr sharedVideoFrame,
                           InferenceEngine::InferRequest &inferRequest,
                           Context &context) {
                            inferRequest.SetCompletionCallback([] {});  // destroy the stored bind object
                            tryPush(context.detectionsProcessorsContext.detectionsProcessorsWorker,
                                    std::make_shared<DetectionsProcessor>(sharedVideoFrame, &inferRequest));
                        }, sharedVideoFrame,
                        inferRequest,
                        std::ref(context)));
        inferRequest.get().StartAsync();
        // do not push as callback does it
    }

    bool Reader::isReady() {
        Context &context = dynamic_cast<ReborningVideoFrame *>(sharedVideoFrame.get())->context;
        context.readersContext.lastCapturedFrameIdsMutexes[sharedVideoFrame->sourceID].lock();
        if (context.readersContext.lastCapturedFrameIds[sharedVideoFrame->sourceID] + 1 == sharedVideoFrame->frameId) {
            return true;
        } else {
            context.readersContext.lastCapturedFrameIdsMutexes[sharedVideoFrame->sourceID].unlock();
            return false;
        }
    }

    void Reader::process() {
        unsigned sourceID = sharedVideoFrame->sourceID;
        Context &context = dynamic_cast<ReborningVideoFrame *>(sharedVideoFrame.get())->context;
        const std::vector<std::shared_ptr<InputChannel>> &inputChannels = context.readersContext.inputChannels;
        if (inputChannels[sourceID]->read(sharedVideoFrame->frame)) {
            context.readersContext.lastCapturedFrameIds[sourceID]++;
            context.readersContext.lastCapturedFrameIdsMutexes[sourceID].unlock();
            tryPush(context.inferTasksContext.inferTasksWorker, std::make_shared<InferTask>(sharedVideoFrame));
        } else {
            context.readersContext.lastCapturedFrameIds[sourceID]++;
            context.readersContext.lastCapturedFrameIdsMutexes[sourceID].unlock();
            try {
                std::shared_ptr<Worker>(context.drawersContext.drawersWorker)->stop();
            } catch (const std::bad_weak_ptr &) {}
        }
    }
}

